package worker

import (
	"context"
	"distributed-task-scheduler/crontab/common"

	"go.etcd.io/etcd/clientv3"
)

// 分布式锁(TXN事务)
type JobLock struct {
	// etcd客户端
	kv    clientv3.KV
	lease clientv3.Lease

	jobName    string             // 任务名
	cancelFunc context.CancelFunc // 用于终止自动续租
	leaseId    clientv3.LeaseID   // 租约ID
	isLocked   bool               // 是否上锁成功
}

// 初始化一把锁
func InitJobLock(jobName string, kv clientv3.KV, lease clientv3.Lease) (jobLock *JobLock) {
	jobLock = &JobLock{
		kv:      kv,
		lease:   lease,
		jobName: jobName,
	}
	return
}

// 尝试上锁 (抢锁)
func (jobLock *JobLock) TryLock() (err error) {
	var (
		leaseGrantResp *clientv3.LeaseGrantResponse
		cancelCtx      context.Context
		cancelFunc     context.CancelFunc
		leaseId        clientv3.LeaseID
		keepRespChan   <-chan *clientv3.LeaseKeepAliveResponse
		txn            clientv3.Txn
		lockKey        string
		txnResp        *clientv3.TxnResponse
	)

	// 1.创建租约(5秒) 防止节点宕机，宕机后让锁释放掉
	if leaseGrantResp, err = jobLock.lease.Grant(context.TODO(), 5); err != nil {
		return // 租约并没有创建出来，错误就不必管了
	}
	// context用于取消自动续租
	cancelCtx, cancelFunc = context.WithCancel(context.TODO())
	// 租约ID
	leaseId = leaseGrantResp.ID

	// 2.自动续租
	if keepRespChan, err = jobLock.lease.KeepAlive(cancelCtx, leaseId); err != nil {
		goto FAIL
	}

	// 3.处理续租应答的协程
	go func() {
		var (
			keepResp *clientv3.LeaseKeepAliveResponse
		)
		for {
			select {
			case keepResp = <-keepRespChan: // 自动续租的应答
				if keepResp == nil { // 空指针说明租约被取消了
					goto END
				}
			}
		}
	END:
	}()

	// 4.创建事务txn
	txn = jobLock.kv.Txn(context.TODO())

	// 锁路径
	lockKey = common.JOB_LOCK_DIR + jobLock.jobName

	// 5.事务抢锁
	txn.If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)).
		Then(clientv3.OpPut(lockKey, "", clientv3.WithLease(leaseId))).
		Else(clientv3.OpGet(lockKey))

	// 提交事务
	if txnResp, err = txn.Commit(); err != nil {
		goto FAIL // 网络请求可能失败，但是后端是否生效不能确定；能做的是把它彻底回滚掉
	}

	// 6.成功返回，失败释放租约
	if !txnResp.Succeeded { // 锁被占用
		err = common.ERR_LOCK_ALREADY_REQUIRED
		goto FAIL
	}

	// 抢锁成功
	jobLock.leaseId = leaseId
	jobLock.cancelFunc = cancelFunc
	jobLock.isLocked = true

	return

FAIL:
	// 保证事务不管成功还是失败，都能释放租约
	cancelFunc() // 取消自动续租
	jobLock.lease.Revoke(context.TODO(), leaseId) // 释放租约 (尽快释放租约，释放没有利用的资源)
	return
}

// 释放锁
func (jobLock *JobLock) Unlock() {
	if jobLock.isLocked {
		jobLock.cancelFunc()                                  // 取消我们程序自动续租的协程
		jobLock.lease.Revoke(context.TODO(), jobLock.leaseId) // 释放租约 (关联的key会被删掉)
	}
}
